package com.dec.kks.etl.spark_stream_ch_kafka;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;

/**
 * <p/>
 * <li>Description:Hdfs utils</li>
 * <li>@author: zhongzhi</li>
 * <li>Date: 2018/5/8 14:50</li>
 */
public class HdfsUtils implements Serializable {
    /** 日志记录 */
    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsUtils.class);


    /**
     * 获取hdfs的文件系统对象
     *
     * @return FileSystem
     * @throws IOException 打开HDFS文件系统异常
     */
    public static FileSystem getFileSystem() throws IOException {
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        Configuration conf = new Configuration();
        //return FileSystem.get(URI.create("hdfs://bigdata-master1.phmcluster.calabar:8020"),conf);
        return FileSystem.get(conf);
    }

    /**
     * 创建目录，可指定存在的目录是否覆盖
     *
     * @param dirPath 目录路径
     * @return 是否创建成功状态
     */
    public static boolean mkdirs(String dirPath) {
        boolean result = false;
        FileSystem fs = null;
        try {
            fs = HdfsUtils.getFileSystem();
            Path path = new Path(dirPath);
            boolean exists = fs.exists(path);
            if (exists) {
                LOGGER.warn("{} 目录已经存在！", dirPath);
            } else {
                result = fs.mkdirs(path);
            }
        } catch (Exception e) {
            LOGGER.error("创建目录失败！", e);
        } finally {
            //HdfsUtils.close(fs);
        }

        return result;
    }

    /**
     * 创建一个空文件, 之后只能进行数据追加
     *
     * @param filePath hdfs文件路径
     * @return 是否创建成功状态
     */
    public static boolean createEmptyFile(String filePath) {
        boolean result = false;
        FileSystem fs = null;
        try {
            fs = HdfsUtils.getFileSystem();
            if (fs.exists(new Path(filePath))) {
                LOGGER.warn("hdfs 目录：{} 已经存在空文件！", filePath);
            } else {
                result = fs.createNewFile(new Path(filePath));
            }
        } catch (Exception e) {
            LOGGER.error("创建一个空文件失败！", e);
        } finally {
            //HdfsUtils.close(fs);
        }

        return result;
    }

    /**
     * hdfs 目录下是否有文件
     *
     * @param path path
     * @return true:目录下有文件
     */
    public static boolean hasFileInDir(String path) {
        boolean result = false;
        FileSystem fs = null;
        try {
            fs = HdfsUtils.getFileSystem();
            FileStatus[] fileStatuses = fs.listStatus(new Path(path));
            result = ArrayUtils.isNotEmpty(fileStatuses);
        } catch (Exception e) {
            LOGGER.error("获取 {} 下的文件失败！", e);
        } finally {
            //close(fs);
        }

        return result;
    }

    /**
     * 判断文件是否存在
     *
     * @param path path
     * @return true:存在
     */
    public static boolean exists(String path) {
        boolean result = false;
        FileSystem fs = null;
        try {
            fs = HdfsUtils.getFileSystem();
            result = fs.exists(new Path(path));
        } catch (Exception e) {
            LOGGER.error("查看 {} 文件是否存在出错！", e);
        } finally {
            //close(fs);
        }

        return result;
    }


    /**
     * 删除hdfs上目录
     *
     * @param path path
     * @return true:删除成功
     */
    public static boolean rm(String path) {
        boolean result = false;
        FileSystem fs = null;
        try {
            fs = HdfsUtils.getFileSystem();
            if (fs.exists(new Path(path))) {
                result = fs.delete(new Path(path), true);
            } else {
                LOGGER.warn("HDFS上目录：{} 不存在！", path);
            }
        } catch (Exception e) {
            LOGGER.error("删除HDFS上目录：{} 失败！", path, e);
        } finally {
            //close(fs);
        }

        return result;
    }

    private static void close(FileSystem fs) {
        if (null != fs) {
            try {
                fs.close();
            } catch (IOException e) {
                LOGGER.error("关闭 FileSystem 失败！", e);
            }
        }
    }

    public static void main(String[] args) {
        System.out.println(exists("/sparkstreaming/dec/loader/tsdb"));
        System.out.println(mkdirs("/sparkstreaming/dec/loader/tsdb"));
        System.out.println(createEmptyFile("/sparkstreaming/dec/loader/tsdb/start"));
        System.out.println(createEmptyFile("/sparkstreaming/dec/loader/tsdb/stop"));
        System.out.println(rm("/sparkstreaming/dec/loader/tsdb"));
    }
}
